package com.matrix.async.core;

import java.io.Serializable;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;

/**
 * 任务队列
 * 
 * @author jiangyouyao
 *
 */
public class TaskQueue implements Serializable, APPComponent, APPQueue {

	Logger log = Logger.getLogger(TaskQueue.class);
	private static final long serialVersionUID = -5828319733039194347L;

	static final int ENQUEUE_EVENT = 1;
	static final int DEQUEUE_EVENT = 2;
	static final int LACK_OF_TASK_EVENT = 3;
	static final int FULL_OF_TASK_EVENT = 4;

	protected final Vector<QueueListener> listeners = new Vector<>();

	@SuppressWarnings("rawtypes")
	protected final BlockingQueue queue = new LinkedBlockingQueue<>();

	public BlockingQueue getQueue() {
		return queue;
	}

	protected String name = null;

	protected String taskType = "";

	protected int upperBoundary = 10000;

	protected int lowerBoundary = 10;

	protected final ReentrantLock listenerLock = new ReentrantLock();
	protected final ReentrantLock putLock = new ReentrantLock();

	protected final Condition condition = putLock.newCondition();

	public TaskQueue(String taskType, int upperBoundary, int lowerBoundary) {
		this.name = "TaskQueue[" + taskType + "]";
		this.taskType = taskType;
		this.upperBoundary = upperBoundary;
		this.lowerBoundary = lowerBoundary;
	}

	/**
	 * 在对列中新增task，并触发入栈事件，当队列达到最大值时，触发满栈事件
	 */
	@Override
	public void put(Task task) {
		this.addQueueListener(task);
		try {
			queue.put(task);
		} catch (InterruptedException e) {
		}
		final ReentrantLock listenerLock = this.listenerLock;
		listenerLock.lock();
		try {
			notifyListeners(ENQUEUE_EVENT, new QueueEvnet(this, task.getID()));
			if (queue.remainingCapacity() >= upperBoundary) {
				notifyListeners(FULL_OF_TASK_EVENT, new QueueEvnet(this, null));
			}
		} finally {
			listenerLock.unlock();
		}
	}

	/**
	 * 在监听队列中注册监听者
	 * 
	 * @param l
	 */
	private void addQueueListener(QueueListener l) {
		final ReentrantLock listenerLock = this.listenerLock;
		listenerLock.lock();
		try {
			listeners.add(l);
		} finally {
			listenerLock.unlock();
		}

	}

	/**
	 * 触发所有监听者的监听事件
	 * 
	 * @param type
	 * @param e
	 */
	private void notifyListeners(int type, QueueEvnet e) {
		for (QueueListener l : listeners) {
			switch (type) {
			case ENQUEUE_EVENT:
				l.enqueue(e);
				break;
			case DEQUEUE_EVENT:
				l.dequeue(e);
				break;
			case LACK_OF_TASK_EVENT:
				l.lackOfTask(e);
				break;
			case FULL_OF_TASK_EVENT:
				l.fullOfTask(e);
				break;
			}
		}
	}

	@Override
	public void put(List<Task> tasks) {
		final ReentrantLock listenerLock = this.listenerLock;
		listenerLock.lock();
		try {
			for (Task task : tasks) {
				this.addQueueListener(task);
				try {
					queue.put(task);
				} catch (InterruptedException e) {
				}
				notifyListeners(ENQUEUE_EVENT, new QueueEvnet(this, task.getID()));
				if (queue.remainingCapacity() >= upperBoundary) {
					notifyListeners(FULL_OF_TASK_EVENT, new QueueEvnet(this, null));
				}
			}

		} finally {
			listenerLock.unlock();
		}

		// 判断线程是否需要等待
		final ReentrantLock putLoct = this.putLock;
		putLock.lock();
		try {
			if (queue.size() >= lowerBoundary) {
				try {
					log.info("队列中的任务数量为["+queue.size()+"] 大于最小任务数量["+lowerBoundary+"]，线程进入wait状态。。。");
					condition.await();
				} catch (InterruptedException e) {
					log.error(":" + this.toString() + "\r\n" + e.getMessage(), e);
				}
			}
		} finally {
			putLock.unlock();
		}

	}

	@Override
	public Task poll() {
		Task task = (Task) queue.poll();
		if (queue.size() < lowerBoundary) {
			signalNotFull();
		}
		final ReentrantLock listenerLock = this.listenerLock;
		listenerLock.lock();
		try {
			notifyListeners(DEQUEUE_EVENT, new QueueEvnet(this, task.getID()));
			if (queue.size() == 0) {
				log.info("对列不满");
				notifyListeners(LACK_OF_TASK_EVENT, new QueueEvnet(this, null));
			}
		} finally {
			listenerLock.unlock();
		}
		log.info(" | " + task.toString());
		return task;
	}

	/**
	 * 唤醒等待线程
	 */
	private void signalNotFull() {
		final ReentrantLock putLock = this.putLock;
		putLock.lock();
		try {

			condition.signalAll();
			log.info("队列中任务数量为[" + queue.size() + "]低于最小值[" + lowerBoundary + "]，唤醒生产者线程");
		} finally {
			putLock.unlock();
		}

	}

	/**
	 * take会造成线程等待
	 */
	@Override
	public Task take() throws InterruptedException {
		Task task = (Task) queue.take();
		if (queue.size() < lowerBoundary) {
			signalNotFull();
		}
		final ReentrantLock listenerLock = this.listenerLock;
		listenerLock.lock();
		try {
			notifyListeners(DEQUEUE_EVENT, new QueueEvnet(this, task.getID()));
			if (queue.size() == 0) {
				// 对列不满
				notifyListeners(LACK_OF_TASK_EVENT, new QueueEvnet(this, null));
			}
		} finally {
			listenerLock.unlock();
		}
		return task;
	}

	/**
	 * 删除一个监听者
	 * 
	 * @param l
	 */
	public void removeQueueListener(QueueListener l) {
		final ReentrantLock listenerLock = this.listenerLock;
		listenerLock.lock();
		try {
			listeners.remove(l);
		} finally {
			listenerLock.unlock();
		}
	}

	@Override
	public int size() {
		return queue.size();
	}

	@Override
	public String getName() {
		return name;
	}

	@Override
	public String getTaskType() {
		return taskType;
	}

}
